package com.atguigu.flink.state.keyedstate;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
 * Created by Smexy on 2023/11/17

 示例: 针对每种传感器输出最高的3个水位值
 */
public class Demo2_ListState
{
    public static void main(String[] args) {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new WaterSensorMapFunction())
                   //按照id分组
                    .keyBy(WaterSensor::getId)
                    .process(new KeyedProcessFunction<String, WaterSensor, String>()
                    {

                        //前三
                        private ListState<Integer> top3Vc;

                        //在Task被创建后，从备份中恢复
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            top3Vc = getRuntimeContext().getListState(new ListStateDescriptor<>("top3Vc", Integer.class));
                        }

                        @Override
                        public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {

                           top3Vc.add(value.getVc());
                           //再比较排序取top3
                            List<Integer> top3List = StreamSupport.stream(top3Vc.get().spliterator(), true)
                                                                 .sorted(Comparator.reverseOrder())
                                                                 .limit(3)
                                                                 .collect(Collectors.toList());

                            out.collect(ctx.getCurrentKey() +"top3Vc:"+top3List);

                            //更新状态,覆盖写
                            top3Vc.update(top3List);

                        }
                    })
                    .print();
        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
